CDKを使用して SQSでLambda Destinations を作成してみた。
概要
Lambda Destinationsで非同期呼び出しのレコードを別のサービスに送信することができます。処理に失敗したイベントと正常に処理されたイベントに別々の送信先を設定できます。
Lambda Destinations :
- Amazon SQS
- Amazon SNS
- Lambda
- EventBridge
この記事では、CDKを使用してSQSでLambda Destinations を作成してみました。ここでは、オブジェクトがS3バケットにアップロードされたときにLambdaをトリガーするS3イベントソースを作成します。次に、Lambda関数は成功イベントと失敗イベントをSQSキューに送信します。
やってみた
CDKアプリの作成
CDKをインストールする
- 次のコマンドを使用してCDKをインストールしておきます。
npm install aws-cdk-lib
CDKアプリを作成する
- 新しいディレクトリを作成しておきます。
- CDKは、プロジェクトディレクトリの名前に基づいてソースファイルとクラスに名前を付けます。
#create new directory mkdir lambda-destinations cd lambda-destinations
- cdk initコマンドを使用してアプリを初期化しておきます。
cdk init --language typescript
Lambda関数の作成
- プロジェクトのメインディレクトリにresourcesディレクトリを作成しておきます。
mkdir resources
- resourcesディレクトリに次のPythonファイルを作成しておきます。[resources/lambda-handler.py]
def handler(event, context): #To test successful events return { 'statusCode': 200, 'body': "SUCCESS" } #To test failed Events #raise Exception("Failed")
AWS サービスの作成
- 新しいファイル [lib/index.ts] を作成して、作成する必要のあるAWSサービスを定義しておきます。
- ファイルに次のAWSサービスを定義しておきます。
- S3 Bucket : LambdaFunctionのイベントソースとしてS3バケットを作成します。
- SQS : 2つのSQSキュー。
- Success キュー : 成功イベントは成功キューに送信されます。
- Failure キュー : 失敗したイベントは失敗キューに送信されます。
- Lambda 関数 : DestinationとしてSQS、イベントソースとしてS3を使用するLambda関数。
import { Construct } from 'constructs'; import { Runtime, Function, AssetCode } from 'aws-cdk-lib/aws-lambda'; import { StackProps, RemovalPolicy} from 'aws-cdk-lib'; import { Queue } from 'aws-cdk-lib/aws-sqs'; import { SqsDestination} from 'aws-cdk-lib/aws-lambda-destinations'; import { Bucket,EventType } from 'aws-cdk-lib/aws-s3'; import { S3EventSource } from 'aws-cdk-lib/aws-lambda-event-sources'; export class lambdaDestinationsStack extends Construct { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id); const bucket = new Bucket(this, 'bucket', { autoDeleteObjects: true, removalPolicy: RemovalPolicy.DESTROY }); const eventSource = new S3EventSource(bucket, { events: [ EventType.OBJECT_CREATED_PUT ] }); const SuccessQueue = new Queue(this, 'SuccessQueue'); const FailureQueue = new Queue(this, 'FailureQueue'); const lambdaFunction = new Function(this, "lambda-sqs", { code: new AssetCode("resorces"), handler: "lambda-handler.handler", runtime: Runtime.PYTHON_3_9, functionName: "lambda-destinations-sqs", onSuccess: new SqsDestination(SuccessQueue), onFailure: new SqsDestination(FailureQueue), }); lambdaFunction.addEventSource(eventSource); } }
アプリにサービスを追加する
- /lib/lambda-destinations-stack.ts ファイルに次のコードを追加しておきます。
#Import the Index file created in the previous step import * as dest from '../lib/index'; new dest.lambdaDestinationsStack(this, 'lambda-destinations');
CDK Deploy
- Deploy する前に、環境をブートストラップする必要があります。
- 次のコマンドを実行して、AWS環境をブートストラップしておきます。
cdk bootstrap
- CDKを展開しておきます。
cdk deploy
- コンソールでは、サービスが作成されたことを見ることができます。
Lambda Function
SQS Queue
Lambda Destinations
Destinationsをテストする
On success
- 成功イベントをテストするには、Lambdaコードを変更しておきます。
def handler(event, context): return { 'statusCode': 200, 'body': "SUCCESS" }
- S3バケットにファイルをアップロードしておきます。ファイルをS3バケットにアップロードすると、Lambda関数の成功イベントが成功キューに送信されます。これで、成功メッセージがキューにあることをみることができます。
- メッセージをポーリングしてみることができます。
On Failure
- 失敗イベントをテストするには、Lambdaコードを変更しておきます。
def handler(event, context): raise Exception("Failed")
- S3バケットにファイルをアップロードしておきます。
- 失敗したイベントが失敗キューに送信されたことをみることができます。
まとめ
CDKを使用して SQSでLambda Destinations を作成してみました。他のDestinations[SNS, EventBridge, Lambda]でLambda Destinationsを試すことができます。
Reference: